Skip to content

Commit e7ce471

Browse files
authored
feat(spanner): support for Multiplexed Session Partitioned Ops (#2252)
* feat(spanner): support for Multiplexed Session Partitioned Ops * feat(spanner): add support for snapshot isolation (#2245) This PR contains code changes to add support for option IsolationLevel at the client level and at the transaction level. supported methods are(RW and Blind Write): ``` - writeAtLeastOnce - runTransactionAsync - runTransaction - getTransaction - async getTransaction(from transaction runner class) ``` * refactor * refactor
1 parent e42caea commit e7ce471

File tree

9 files changed

+485
-261
lines changed

9 files changed

+485
-261
lines changed

.kokoro/presubmit/node14/system-test-multiplexed-session.cfg

Lines changed: 5 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

.kokoro/trampoline_v2.sh

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

observability-test/database.ts

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1964,7 +1964,7 @@ describe('Database', () => {
19641964
},
19651965
};
19661966

1967-
let fakePool: FakeSessionPool;
1967+
let fakeSessionFactory: FakeSessionFactory;
19681968
let fakeSession: FakeSession;
19691969
let fakePartitionedDml = new FakeTransaction(
19701970
{} as google.spanner.v1.TransactionOptions.PartitionedDml
@@ -1974,14 +1974,17 @@ describe('Database', () => {
19741974
let beginStub;
19751975

19761976
beforeEach(() => {
1977-
fakePool = database.pool_;
1977+
fakeSessionFactory = database.sessionFactory_;
19781978
fakeSession = new FakeSession();
19791979
fakePartitionedDml = new FakeTransaction(
19801980
{} as google.spanner.v1.TransactionOptions.PartitionedDml
19811981
);
19821982

19831983
getSessionStub = (
1984-
sandbox.stub(fakePool, 'getSession') as sinon.SinonStub
1984+
sandbox.stub(
1985+
fakeSessionFactory,
1986+
'getSessionForPartitionedOps'
1987+
) as sinon.SinonStub
19851988
).callsFake(callback => {
19861989
callback(null, fakeSession);
19871990
});
@@ -2077,7 +2080,7 @@ describe('Database', () => {
20772080
beginStub.callsFake(callback => callback(fakeError));
20782081

20792082
const releaseStub = (
2080-
sandbox.stub(fakePool, 'release') as sinon.SinonStub
2083+
sandbox.stub(fakeSessionFactory, 'release') as sinon.SinonStub
20812084
).withArgs(fakeSession);
20822085

20832086
database.runPartitionedUpdate(QUERY, async (err, rowCount) => {
@@ -2124,7 +2127,7 @@ describe('Database', () => {
21242127

21252128
it('session released on transaction end', done => {
21262129
const releaseStub = (
2127-
sandbox.stub(fakePool, 'release') as sinon.SinonStub
2130+
sandbox.stub(fakeSessionFactory, 'release') as sinon.SinonStub
21282131
).withArgs(fakeSession);
21292132

21302133
database.runPartitionedUpdate(QUERY, async () => {

src/database.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2945,7 +2945,7 @@ class Database extends common.GrpcServiceObject {
29452945
...this._traceConfig,
29462946
};
29472947
return startTrace('Database.runPartitionedUpdate', traceConfig, span => {
2948-
this.pool_.getSession((err, session) => {
2948+
this.sessionFactory_.getSessionForPartitionedOps((err, session) => {
29492949
if (err) {
29502950
setSpanError(span, err);
29512951
span.end();
@@ -2976,21 +2976,21 @@ class Database extends common.GrpcServiceObject {
29762976
}
29772977
transaction.begin(err => {
29782978
if (err) {
2979-
this.pool_.release(session!);
2979+
this.sessionFactory_.release(session!);
29802980
callback!(err, 0);
29812981
return;
29822982
}
29832983

29842984
transaction.runUpdate(query, (err, updateCount) => {
29852985
if (err) {
29862986
if (err.code !== grpc.status.ABORTED) {
2987-
this.pool_.release(session!);
2987+
this.sessionFactory_.release(session!);
29882988
callback!(err, 0);
29892989
return;
29902990
}
29912991
this._runPartitionedUpdate(session, query, callback);
29922992
} else {
2993-
this.pool_.release(session!);
2993+
this.sessionFactory_.release(session!);
29942994
callback!(null, updateCount);
29952995
return;
29962996
}

src/session-factory.ts

Lines changed: 36 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,14 @@ export interface SessionFactoryInterface {
5656
*/
5757
getSession(callback: GetSessionCallback): void;
5858

59+
/**
60+
* When called returns a session for paritioned dml.
61+
*
62+
* @name SessionFactoryInterface#getSessionForPartitionedOps
63+
* @param {GetSessionCallback} callback The callback function.
64+
*/
65+
getSessionForPartitionedOps(callback: GetSessionCallback): void;
66+
5967
/**
6068
* When called returns the pool object.
6169
*
@@ -97,6 +105,7 @@ export class SessionFactory
97105
multiplexedSession_: MultiplexedSessionInterface;
98106
pool_: SessionPoolInterface;
99107
isMultiplexed: boolean;
108+
isMultiplexedPartitionOps: boolean;
100109
constructor(
101110
database: Database,
102111
name: String,
@@ -117,6 +126,11 @@ export class SessionFactory
117126
process.env.GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS === 'true'
118127
? (this.isMultiplexed = true)
119128
: (this.isMultiplexed = false);
129+
// set the isMultiplexedPartitionedOps property to true if multiplexed session is enabled for paritioned ops, otherwise set the property to false
130+
this.isMultiplexedPartitionOps =
131+
process.env.GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS === 'true' &&
132+
process.env.GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS_PARTITIONED_OPS ===
133+
'true';
120134
// Multiplexed sessions should only be created if its enabled.
121135
if (this.isMultiplexed) {
122136
this.multiplexedSession_.on('error', this.emit.bind(database, 'error'));
@@ -143,6 +157,23 @@ export class SessionFactory
143157
);
144158
}
145159

160+
/**
161+
* Retrieves a session for partitioned operations, selecting the appropriate session type
162+
* based on whether multiplexed sessions are enabled.
163+
*
164+
* If multiplexed sessions are enabled for partitioned ops this methods delegates the request to `getSession()`, which returns
165+
* either a multiplexed session or a regular session based on the configuration.
166+
*
167+
* If the multiplexed sessions are disabled, a session is retrieved from the regular session pool.
168+
*
169+
* @param {GetSessionCallback} callback The callback function.
170+
*/
171+
getSessionForPartitionedOps(callback: GetSessionCallback): void {
172+
this.isMultiplexedPartitionOps
173+
? this.getSession(callback)
174+
: this.pool_.getSession(callback);
175+
}
176+
146177
/**
147178
* Returns the regular session pool object.
148179
*
@@ -154,9 +185,11 @@ export class SessionFactory
154185
}
155186

156187
/**
157-
* Releases a session back to the session pool.
188+
* Releases a regular session back to the session pool.
189+
*
190+
* This methods does not release a multiplexed session.
158191
*
159-
* This method returns a session to the pool after it is no longer needed.
192+
* It returns a session to the pool after it is no longer needed.
160193
* It is a no-op for multiplexed sessions.
161194
*
162195
* @param {Session} session - The session to be released. This should be an instance of `Session` that was
@@ -165,7 +198,7 @@ export class SessionFactory
165198
* @throws {Error} If the session is invalid or cannot be released.
166199
*/
167200
release(session: Session): void {
168-
if (!this.isMultiplexed) {
201+
if (!session.metadata?.multiplexed) {
169202
this.pool_.release(session);
170203
}
171204
}

0 commit comments

Comments
 (0)